package com.opensource.msgui.ctl.demo.controller.v1.service;

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;

/**
 * TransactionStatus.CommitTransaction ：消息提交，当消息状态为 CommitTransaction ，表示允许消费者允许消费当前消息
 * TransactionStatus.RollbackTransaction ：消息回滚，表示 MQ 服务端将会删除当前半消息，不允许消费者消费。
 * TransactionStatus.Unknown ：中间状态，表示 MQ 服务需要发起回查操作，检测当前发送方本地事务的执行状态。
 */
@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5, maximumPoolSize = 10)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
    /**
     * 消息生产者需要在 executeLocalTransaction 中执行本地事务 , 当事务半消息提交成功，执行完毕后需要返回事务状态码。
     *
     * @param msg
     * @param o
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object o) {
        Object num = msg.getHeaders().get("test");
        if ("1".equals(num)) {
            System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " unknown");
            return RocketMQLocalTransactionState.UNKNOWN; // 将会导致再次查询本地事务
        } else if ("2".equals(num)) {
            System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " rollback");
            return RocketMQLocalTransactionState.ROLLBACK; // 半消息将会被 mq 服务器删除，并且消费者不会消费到该消息
        }

        System.out.println("executer: " + new String((byte[]) msg.getPayload()) + "commit");
        return RocketMQLocalTransactionState.COMMIT; // 半消息提交，消费者会消费到该消息。
    }

    /**
     * 实现 checkLocalTransaction 方法，该方法用于进行本地事务执行情况回查，并回应事务状态给MQ 的 broker,
     * 执行完成之后需要返回对应的事务状态码
     *
     * @param message
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        System.out.println("check: " + new String((byte[]) message.getPayload()));
        return RocketMQLocalTransactionState.COMMIT;
    }
}